package sparkSinkToHdfs;

import kafka.serializer.StringDecoder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * 描述:spark保存在hdfs上的目录中追加文件
 *
 * @author jiantao7
 * @create 2018-05-08 18:50
 */
public class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<Object,Object> {

    private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HHmmss");
    private String dateString = simpleDateFormat.format(System.currentTimeMillis());
    @Override
    protected String generateFileNameForKeyValue(Object key, Object value, String name) {
        return dateString+"_spark_kafka"+".txt";
    }

    @Override
    public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
        String name = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR);
        Path outDir = null;

        if (name == null)
        {
            outDir = null;
        }
        else
        {
            outDir = new Path(name);
        }

        if (outDir == null && job.getNumReduceTasks() != 0) {
            throw new InvalidJobConfException("Output directory not set in JobConf.");
        }

        if (outDir != null)
        {
            FileSystem fileSystem = outDir.getFileSystem(job);
            outDir = fileSystem.makeQualified(outDir);
            outDir = new Path(job.getWorkingDirectory(),outDir);
            Path[] ps = {outDir};
            job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outDir.toString());
            TokenCache.obtainTokensForNamenodes(job.getCredentials(), ps, job);
            //下面的注释掉，就不会出现这个目录已经存在的提示了
        /* if (fs.exists(outDir)) {
             throw new FileAlreadyExistsException("Outputdirectory"
                     + outDir + "alreadyexists");
         }
      }*/
        }

    }

    /**
     * 基于Kafka Direct方式的实时wordcount程序
     * @author Administrator
     *
     */
    public static class KafkaDirecReadSinkToHdfs {

        public static void main(String[] args) throws InterruptedException {
            SparkConf conf = new SparkConf()
                    .setMaster("local[*]")
                    .setAppName("KafkaDirectWordCount");
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

            // 首先，要创建一份kafka参数map
            Map<String, String> kafkaParams = new HashMap<String, String>();
            String brokerList = "10.112.80.6:9092";
            kafkaParams.put("metadata.broker.list", brokerList);

            // 然后，要创建一个set，里面放入，你要读取的topic
            // 这个，就是我们所说的，它自己给你做的很好，可以并行读取多个topic
            Set<String> topics = new HashSet<String>();
            topics.add("kafka_flume_hdfs1");

            // 创建输入DStream
            JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
                    jssc,
                    String.class,
                    String.class,
                    StringDecoder.class,
                    StringDecoder.class,
                    kafkaParams,
                    topics);

            lines.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
                @Override
                public void call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
                    stringStringJavaPairRDD.saveAsHadoopFile("hdfs://cdhmanager:8020/flumedata/spark",Text.class,Text.class, RDDMultipleTextOutputFormat.class);
                }
            });
            jssc.start();
            jssc.awaitTermination();
            jssc.close();
        }

    }
}